1 | public class KafkaConsumer<K,V> extends java.lang.Object implements Consumer<K,V> |
用于消费记录。
显式处理broker失效、适应分区在集群内转移,允许通过消费组负载均衡。
为了获取数据,维护与broker的连接。关闭失败将导致连接泄漏。
不是线程安全的,多线程使用详见Multi-threaded Processing
1 跨版本兼容
能够与版本>=0.10.0的broker通信,但不支持某些特性(抛出异常UnsupportedVersionException)。
2 偏移和消费位置
offset唯一标识记录在每个分区中的位置。
两个重要的相关概念:
- 消费位置:offset指示了即将消费的下一位置。每当消费者接受记录(调用poll(Duration))后自动后移。
- 提交位置:最近被安全存储的偏移位置。如果进程失效或重启,消费者将从该偏移恢复。消费者可以周期性提交偏移,或者人工调用API设置(commitSync和commitAsync)
者两种位置是消费者可以控制消费完成的时机。
3 消费组和主题订阅
Kafka使用消费组实现伸缩性和容错。消费进程可以在单一或集群机器上运行。拥有相同group.id的消费者视为同一组。
每个消费者可以通过Subscribe API动态设置订阅的主题。主题的分区在同一消费组中的所有订阅该主题的消费者中负载均衡。
某个消费者失效后,原先分配给它的分区将重新分配给同一组中的其他消费者。同理,添加新的消费者时,将其他消费者上的分区均分给它。这个过程叫rebalancing。rebalancing同样用于主题的变化。当主题增加新的分区,或者创建了匹配subscribed regex 的主题,消费组将自动周期性刷新,并分配新的分区给成员。
消费组可以视为具有多个消费进程的单一逻辑订阅者。由于消费者相当廉价,Kafka支持在没有数据冗余的情况下任意多个消费组。
消费者可以通过 ConsumerRebalanceListener做一些应用级别的逻辑,如状态清理、人为提交偏移等。详见Storing Offsets Outside Kafka。
4 检测消费者失效
当消费者调用poll(Duration)后,消费者将自动加入消费组。poll API用于确保消费者存活。只要继续调用,消费者就会被包含在组内。底层逻辑是消费者定期发送心跳包给服务器,当接收时长超过session.timeout.ms,消费者将被判定为失效,其拥有的分区将被分发。
消费者有时会遇到“livelock”:只发送心跳包,但不作为。Kafka通过限制poll调用的时间间隔max.poll.interval.ms来避免。超过最大间隔后,调用commitSync()时抛出异常CommitFailedException。这个机制确保了只有活跃的组成员才能提交偏移。因此只有不断调用poll才能留在组内。
消费者使用以下两种配置控制poll循环行为:
max.poll.interval.ms
由于rebalance是在poll中使用,调整检测时间将同样调整rebalance时间,可能延缓对消费者失效的处理。
max.poll.records
限制单次调用poll返回的记录数量。可以间接地调整poll时间间隔。
由于消息处理的时间差异难以预测,两种方式都有效。
为了让消费者在消息处理的同时调用poll,建议将消息处理移动到另一个线程中。需要确保提交的偏移不能在实际位置之前。通常需要关闭自动提交,改为手动提交。只有确认处理线程处理完成后才能提交。在处理线程返回之前,需要pause分区,以使从poll没有新的记录被接收。
5 用例
(1) 自动偏移提交
1 | Properties props = new Properties(); |
(2) 人工偏移提交
允许人工确认处理状态,用于具有复杂处理逻辑的场景,实现”at least once”语义
1 | Properties props = new Properties(); |
注意:自动提交也可以实现“at least once”语义。在下一次调用poll()或关闭消费者时才认为上一次处理完成。
人工提交也可以逐条记录提交,实现更细粒度的提交。
1 | // 逐个分区处理记录 |
(3) 人工分区分配
当处理过程涉及本地信息,或者处理过程本身就是高可用的,不需要Kafka再提供时。
1 | String topic = "foo"; |
注意:
group.id仍可用于偏移提交
只能通过assign()重新指定分区
为了避免偏移提交冲突,确保消费者间组ID唯一
不能在主题订阅时混淆动态分配和人工分配
(4) 外部存储偏移
http://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html